use super::{cli::de, error::Error, vg_pool::VolumeGroup, CmnQueryArgs};
use crate::{
    bdev::PtplFileOps,
    bdev_api::{bdev_create, BdevError},
    core::{NvmfShareProps, Protocol, PtplProps, Share, UntypedBdev, UpdateProps},
    lvm::{
        cli::LvmCmd,
        property::{Property, PropertyType},
    },
    pool_backend::PoolBackend,
};

use std::{
    ops::{Deref, DerefMut},
    pin::Pin,
};

/// Different list options for a logical volume.
#[derive(Default, Debug)]
pub(crate) struct QueryArgs {
    /// Pertaining the volume group parent.
    vg: super::vg_pool::QueryArgs,
    /// Pertaining the logical volume itself.
    lv: CmnQueryArgs,
    /// Our lv's are "special", as in we set the uuid to the name, and add the
    /// name as a lv_tag (LVM_TAG.lv.name=name)
    regular_lv: bool,
}
impl QueryArgs {
    /// Builder-like creating a default `Self`.
    pub(crate) fn new() -> Self {
        Self::default()
    }
    /// Add the VG query args.
    pub(crate) fn with_vg(self, vg: CmnQueryArgs) -> Self {
        Self {
            vg: vg.into(),
            ..self
        }
    }
    /// Add the LV query args.
    pub(crate) fn with_lv(self, lv: CmnQueryArgs) -> Self {
        Self { lv, ..self }
    }
    /// Get a comma-separated list of query selection args.
    /// todo: should be Display trait?
    pub(super) fn query(&self) -> Result<String, Error> {
        let mut select = self.vg.query()?;
        let args = &self.lv;

        if self.regular_lv {
            if let Some(lv_name) = &args.name {
                super::is_alphanumeric("lv_name", lv_name)?;
                select.push_str(&format!("lv_name={lv_name},"));
            }
            if let Some(lv_uuid) = &args.uuid {
                super::is_alphanumeric("lv_uuid", lv_uuid)?;
                select.push_str(&format!("lv_uuid={lv_uuid},"));
            }
        } else {
            if let Some(name) = &args.name {
                super::is_alphanumeric("name", name)?;
                select.push_str(&format!(
                    "lv_tags={},",
                    Property::LvName(name.to_string()).tag()
                ));
            }
            if let Some(lv_name) = &args.uuid {
                super::is_alphanumeric("lv_name", lv_name)?;
                select.push_str(&format!("lv_name={lv_name},"));
            }
        }

        if let Some(lv_tag) = &args.tag {
            select.push_str(&format!("lv_tags={lv_tag},"));
        }
        Ok(select)
    }
}

/// A Logical Volume (LV) is a virtual block device that can be used by the
/// system or applications.
/// Each block of data in an LV is stored on one or more PV in the VG, according
/// to algorithms implemented by Device Mapper (DM) in the kernel.
#[derive(Debug, Clone, Deserialize)]
pub struct LogicalVolume {
    /// The LV uuid.
    /// This is generated by LVM and we cannot specify a particular value.
    #[allow(dead_code)]
    lv_uuid: String,
    /// The LV name.
    /// We set this to the Replica UUID.
    lv_name: String,
    /// The name of the parent VG.
    vg_name: String,
    /// The uuid of the parent VG.
    /// This is generated by LVM and we cannot specify a particular value.
    vg_uuid: String,
    #[serde(deserialize_with = "de::number_from_string")]
    vg_extent_size: u64,
    /// The LV path.
    /// The logical volume full path name is /dev/<vg_name>/<lv_name>.
    #[serde(rename = "lv_path")]
    path: String,
    /// Size of the LV.
    #[serde(rename = "lv_size")]
    #[serde(deserialize_with = "de::number_from_string")]
    size: u64,
    /// Tags of the LV converted to our properties.
    #[serde(rename = "lv_tags")]
    #[serde(deserialize_with = "de::comma_separated")]
    tags: Vec<Property>,
    /// Tags of the parent VG converted to our properties.
    #[serde(rename = "vg_tags")]
    #[serde(deserialize_with = "de::comma_separated")]
    vg_tags: Vec<Property>,

    #[serde(skip)]
    runtime: RunLogicalVolume,
}

/// Runtime settings for the LogicalVolume.
#[derive(Debug, Default, Clone)]
pub struct RunLogicalVolume {
    /// The protocol of the SPDK Bdev which is created against the lv_path.
    /// This is a mirror of the equivalent LV property tag.
    share: Protocol,
    /// LV's are created with name as a tag, so we have to probe the tags and
    /// set name if we find the tag.
    /// This is a mirror of the equivalent LV property tag.
    name: Option<String>,
    /// The entity id which owns this resource, eg: the parent volume.
    /// This is a mirror of the equivalent LV property tag.
    entity_id: Option<String>,

    /// SPDK Bdev parameters which are needed by LVM.
    bdev: Option<BdevOpts>,
    /// When modifying the properties, we may fail to update the LVM LV tags.
    /// In such case, set the dirty flag.
    tags_dirty: bool,
}

/// Runtime settings for the LogicalVolume.
#[derive(Debug, Default, Clone)]
pub(crate) struct BdevOpts {
    /// The share URI of the SPDK Bdev which is created against the lv_path.
    share_uri: Option<String>,
    /// The URI of the SPDK Bdev which is created against the lv_path.
    open_uri: String,
    allowed_hosts: Vec<String>,
    share: Protocol,
    size: u64,
}
impl From<UntypedBdev> for BdevOpts {
    fn from(bdev: UntypedBdev) -> Self {
        Self {
            share_uri: bdev.share_uri(),
            open_uri: bdev.bdev_uri_original_str().unwrap_or_default(),
            allowed_hosts: bdev.allowed_hosts(),
            share: bdev.shared().unwrap_or_default(),
            size: bdev.size_in_bytes(),
        }
    }
}
impl BdevOpts {
    fn update_from(&mut self, to: Self) {
        self.share_uri = to.share_uri;
        self.allowed_hosts = to.allowed_hosts;
        self.share = to.share;
    }
    /// Get a reference to the original bdev uri.
    pub(crate) fn uri(&self) -> &str {
        &self.open_uri
    }
}

impl Deref for LogicalVolume {
    type Target = RunLogicalVolume;

    fn deref(&self) -> &Self::Target {
        &self.runtime
    }
}
impl DerefMut for LogicalVolume {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.runtime
    }
}

#[derive(Debug, Deserialize)]
struct LogicalVolumeList {
    lv: Vec<LogicalVolume>,
}

impl LogicalVolume {
    /// Create a new logical volume for the given vg uuid.
    pub(crate) async fn create(
        vg_uuid: &str,
        name: &str,
        size: u64,
        uuid: &str,
        thin: bool,
        entity_id: &Option<String>,
        share: Protocol,
    ) -> Result<LogicalVolume, Error> {
        let pool = VolumeGroup::lookup(CmnQueryArgs::ours().uuid(vg_uuid)).await?;
        pool.create_lvol(name, size, uuid, thin, entity_id, share)
            .await?;
        Self::lookup(
            &QueryArgs::new()
                .with_lv(CmnQueryArgs::ours().uuid(uuid))
                .with_vg(CmnQueryArgs::ours().uuid(vg_uuid)),
        )
        .await
    }

    /// Lookup a single logical volume.
    pub(crate) async fn lookup(args: &QueryArgs) -> Result<Self, Error> {
        let vgs = Self::list(args).await?;
        vgs.into_iter().next().ok_or(Error::LvNotFound {
            query: args.query().unwrap_or_else(|e| e.to_string()),
        })
    }

    /// List logical volumes using the provided options as query criteria.
    /// All lv's are imported and all imports must succeed.
    pub(crate) async fn list(opts: &QueryArgs) -> Result<Vec<LogicalVolume>, Error> {
        let mut g_error = Ok(());
        let mut lvs = Self::fetch(opts).await?;
        for lv in &mut lvs {
            match lv.import().await {
                Ok(_) => {}
                Err(error) => {
                    tracing::error!(
                        "Failed to import {vg_name}/{lv_name}: {error}",
                        vg_name = lv.vg_name,
                        lv_name = lv.lv_name,
                    );
                    g_error = Err(error);
                }
            }
        }
        g_error?;
        Ok(lvs)
    }

    /// Fetch logical volumes using the provided options as query criteria.
    async fn fetch(opts: &QueryArgs) -> Result<Vec<LogicalVolume>, Error> {
        let mut args = vec![
            "--report-format=json",
            "--options=lv_name,lv_uuid,lv_size,lv_path,lv_tags,vg_name,vg_uuid,vg_tags,vg_extent_size",
            "--units=b",
            "--nosuffix",
            "-q",
        ];
        let select = opts.query()?;
        let select_query = format!("--select={select}");
        if !select.is_empty() {
            args.push(select_query.trim_end_matches(','));
        }

        let mut report: LogicalVolumeList =
            LvmCmd::lv_list().args(args.as_slice()).report().await?;

        report.lv.iter_mut().for_each(|lv| lv.import_attrs());

        Ok(report.lv)
    }

    /// Destroy the logical volume.
    /// This unloads the lvol from the Bdev module first and then proceeds to
    /// remove the lv from the parent volume group.
    pub(crate) async fn destroy(mut self) -> Result<(), Error> {
        self.export_bdev().await?;
        let ptpl = self.ptpl();
        self.remove().await?;
        ptpl.destroy().ok();
        Ok(())
    }

    /// Import the Logical Volume.
    /// This is a required step after creating the LV, which adds the name and
    /// the share protocol as property tags.
    /// The LV is then imported as an spdk BDEV, which allows it to be shared
    /// via nvmf or open locally (ex: by the nexus).
    pub(crate) async fn import(&mut self) -> Result<(), Error> {
        if !self.ours() || !self.vg_ours() {
            return Ok(());
        }
        self.import_bdev().await
    }

    /// Resize the logical volume AND the SPDK Bdev to the given size.
    /// > Note: If the SPDK Bdev fails to be resized, then the LV size is
    /// > reverted back.
    /// # Warning: If the SPDK Bdev is not loaded, then it is also not resized
    /// and the operation still succeeds.
    pub(crate) async fn resize(&mut self, size: u64) -> Result<(), Error> {
        let prev_size = self.size;
        self.resize_lv(size).await?;

        if let Err(error) = self.resize_bdev(size).await {
            self.resize_lv(prev_size).await.ok();
            return Err(error);
        }

        Ok(())
    }

    /// Export out the SPDK bdev.
    /// The bdev is unshared (if shared) and closed, allowing the logical volume
    /// to be closed and/or destroyed.
    pub(super) async fn export_bdev(&mut self) -> Result<(), Error> {
        let Ok(bdev) = self.bdev_opts() else {
            // Nothing to do if the bdev was not setup...
            return Ok(());
        };
        let uri = bdev.open_uri.clone();
        crate::spdk_run!(async move {
            if let Ok(mut bdev) = Self::bdev(&uri) {
                // todo: must we error if we can't unshare?
                Self::bdev_unshare(&mut bdev).await?;
            }

            let bdev = crate::bdev::uri::parse(&uri).unwrap();
            match bdev.destroy().await {
                Ok(()) | Err(BdevError::BdevNotFound { .. }) => Ok(()),
                Err(source) => Err(Error::BdevExport { source }),
            }
        })?;
        self.bdev = None;
        Ok(())
    }

    /// Remove (delete really) the logical volume from the parent volume group.
    /// # Note: if the Bdev is not unloaded first, then lv removal will be reject
    /// by the lvm driver.
    async fn remove(self) -> Result<(), Error> {
        LvmCmd::lv_remove().arg(self.path).arg("-y").run().await?;

        info!("lvm volume {} deleted", self.lv_name);
        Ok(())
    }

    /// Resize only the logical volume to the given size (not the SPDK Bdev).
    async fn resize_lv(&mut self, size: u64) -> Result<(), Error> {
        let size_str = format!("-L{size}b");
        match LvmCmd::lv_resize()
            .arg(&self.path)
            .arg(size_str)
            .arg("-y")
            .run()
            .await
        {
            Ok(()) => Ok(()),
            // not great, but not sure how else to map the error otherwise...
            Err(Error::LvmBinErr { error, .. }) if error.starts_with("Insufficient free space") => {
                Err(Error::NoSpace { error })
            }
            Err(error) => Err(error),
        }
    }

    /// Resize the logical volume's SPDK Bdev to the given size.
    async fn resize_bdev(&mut self, size: u64) -> Result<(), Error> {
        let Ok((bdev, uri)) = self.bdev_mut_uri() else {
            // If the Bdev is not open, we don't need to resize it!?
            return Ok(());
        };
        let (rc, size) = crate::spdk_run!(async move {
            let mut bdev = Self::bdev(&uri)?;

            let blk_cnt = size / bdev.block_len() as u64;

            use spdk_rs::libspdk::spdk_bdev_notify_blockcnt_change;
            let rc =
                unsafe { spdk_bdev_notify_blockcnt_change(bdev.unsafe_inner_mut_ptr(), blk_cnt) };
            Ok((rc, bdev.size_in_bytes()))
        })?;
        if rc != 0 {
            error!("failed to notify block cnt change on lvol: {rc}");
            return Err(Error::BdevShareUri {});
        }
        bdev.size = size;
        Ok(())
    }

    async fn sync_share_opts(&mut self) -> Result<(), Error> {
        let Some(opts) = &self.bdev else {
            return Ok(());
        };
        let properties = vec![
            Property::LvShare(opts.share),
            Property::LvAllowedHosts(opts.allowed_hosts.clone()),
        ];

        self.set_properties(properties).await
    }
    async fn sync_share_protocol(&mut self, protocol: Protocol) -> Result<(), Error> {
        self.set_property(Property::LvShare(protocol)).await?;
        self.share = protocol;
        Ok(())
    }
    async fn build_set_properties_args(&self, properties: Vec<Property>) -> Vec<String> {
        let mut args = Vec::new();
        for property in properties {
            args.extend(self.build_set_property_args(property).await);
        }
        args
    }
    async fn build_set_property_args(&self, property: Property) -> Vec<String> {
        let existing = self.properties(&property.type_());
        let mut add = true;
        let mut args = existing
            .iter()
            .flat_map(|old| match old == &property {
                true => {
                    add = false;
                    None
                }
                false => Some(old.del()),
            })
            .collect::<Vec<_>>();
        if add {
            args.push(property.add());
        }
        args
    }
    async fn set_properties(&mut self, properties: Vec<Property>) -> Result<(), Error> {
        let args = self.build_set_properties_args(properties).await;
        if args.is_empty() {
            return Ok(());
        }
        let result = LvmCmd::lv_change().args(args).arg(&self.path).run().await;
        if result.is_err() {
            self.tags_dirty = true;
        }
        result
    }
    pub(crate) async fn set_property(&mut self, property: Property) -> Result<(), Error> {
        self.set_properties(vec![property]).await
    }

    async fn bdev_sync_props(
        bdev: &mut UntypedBdev,
        protocol: Protocol,
        ptpl: impl PtplFileOps,
        allowed_hosts: Vec<String>,
    ) -> Result<(), Error> {
        match protocol {
            Protocol::Nvmf => {
                let props = NvmfShareProps::new()
                    .with_allowed_hosts(allowed_hosts)
                    .with_ptpl(ptpl.create().map_err(|source| Error::BdevShare {
                        source: crate::core::CoreError::Ptpl {
                            reason: source.to_string(),
                        },
                    })?);
                Self::bdev_share_nvmf(bdev, Some(props)).await?;
            }
            Protocol::Off => {
                Self::bdev_unshare(bdev).await?;
            }
        }

        Ok(())
    }

    /// Import the Logical Volume by loading it via an SPDK bdev using AIO.
    /// todo: Allow using either aio or uring.
    /// todo: Test performance.
    async fn import_bdev(&mut self) -> Result<(), Error> {
        let disk_uri = format!("aio://{}?uuid={}", self.path, self.uuid());

        let allowed_hosts = self
            .property(&PropertyType::LvAllowedHosts)
            .and_then(|p| p.LvAllowedHosts())
            .unwrap_or_default();

        let share = self.share;
        let ptpl = self.ptpl();

        let bdev = crate::spdk_run!(async move {
            if crate::core::UntypedBdev::lookup_by_name(&disk_uri).is_none() {
                bdev_create(&disk_uri)
                    .await
                    .map_err(|source| Error::BdevImport { source })?;
            }

            let mut bdev = Self::bdev(&disk_uri)?;
            Self::bdev_sync_props(&mut bdev, share, ptpl, allowed_hosts).await?;

            Ok(BdevOpts::from(bdev))
        })?;
        self.bdev = bdev.into();
        Ok(())
    }
    fn bdev_mut(&mut self) -> Result<&mut BdevOpts, Error> {
        let Some(bdev) = self.bdev.as_mut() else {
            // Nothing to do if the bdev was not setup...
            return Err(Error::BdevMissing {});
        };
        Ok(bdev)
    }
    fn bdev_mut_uri(&mut self) -> Result<(&mut BdevOpts, String), Error> {
        let bdev = self.bdev_mut()?;
        let uri = bdev.open_uri.clone();
        Ok((bdev, uri))
    }
    pub(crate) fn bdev_opts(&self) -> Result<&BdevOpts, Error> {
        let Some(uri) = self.bdev.as_ref() else {
            // Nothing to do if the bdev was not setup...
            return Err(Error::BdevMissing {});
        };
        Ok(uri)
    }

    fn import_attrs(&mut self) {
        self.name = self
            .property(&PropertyType::LvName)
            .and_then(|p| p.LvName());
        self.share = self
            .property(&PropertyType::LvShare)
            .and_then(|p| p.LvShare())
            .unwrap_or_default();
        self.entity_id = self
            .property(&PropertyType::LvEntityId)
            .and_then(|p| p.LvEntityId());
        self.tags_dirty = false;
        tracing::trace!("{self:?}");
    }

    fn property(&self, property: &PropertyType) -> Option<Property> {
        self.tags
            .iter()
            .find(|tag| tag.key() == property.value())
            .cloned()
    }
    fn properties(&self, property: &PropertyType) -> Vec<Property> {
        self.tags
            .iter()
            .filter(|tag| tag.key() == property.value())
            .cloned()
            .collect()
    }
    /// Get the name of the volume group where this logical volume resides.
    pub(crate) fn vg_name(&self) -> &str {
        &self.vg_name
    }
    /// Get the uuid of the volume group where this logical volume resides.
    pub(crate) fn vg_uuid(&self) -> &str {
        &self.vg_uuid
    }
    /// Get the extent size of the volume group where this logical volume
    /// resides.
    pub(crate) fn extent_size(&self) -> u64 {
        self.vg_extent_size
    }
    /// The "apparent" size of the Logical Volume.
    /// If the SPDK Bdev is not open, then it's just the lv size.
    /// If it's open, then it's the smallest between the bdev and the lv.
    pub(crate) fn size(&self) -> u64 {
        match self.bdev.as_ref().map(|b| b.size) {
            None => self.size,
            Some(b_size) => self.size.min(b_size),
        }
    }
    /// Check the lv is thin provisioned (otherwise it's thick).
    pub(crate) fn thin(&self) -> bool {
        false
    }
    /// LV's are created with name=replica uuid, so we have to "swap" here.
    pub(crate) fn uuid(&self) -> &str {
        &self.lv_name
    }
    /// Check if this LV is owned by us.
    pub(crate) fn ours(&self) -> bool {
        self.tags.contains(&Property::Lvm)
    }
    /// Check if the parent VG is owned by us.
    pub(crate) fn vg_ours(&self) -> bool {
        self.vg_tags.contains(&Property::Lvm)
    }
    /// Get the name of the logical volume.
    pub(crate) fn name(&self) -> &Option<String> {
        &self.name
    }
    /// Get the entity id of the resource which owns this logical volume.
    pub(crate) fn entity_id(&self) -> Option<&String> {
        self.entity_id.as_ref()
    }
    /// Get the URI of the SPDK bdev which is layered on top of the lv.
    pub(crate) fn uri(&self) -> Option<&String> {
        self.bdev.as_ref()?.share_uri.as_ref()
    }
    /// Get the SPDK bdev allowed hosts.
    pub(crate) fn allowed_hosts(&self) -> Option<&Vec<String>> {
        self.bdev.as_ref().map(|b| &b.allowed_hosts)
    }
}

/// These should be part of the Share trait but there are a few things that make
/// it difficult:
/// 1. It uses Pin which is not required for our lvm
/// 2. Some of the functions part of the Share trait are blocking but we're
///    currently using a trampoline between tokio and spdk reactors which makes
///    this difficult.
///
/// todo: would reactor block on help here? we could go the
///    other way and use trampoline from spdk to tokio, but then also hit the
///    problem?
impl LogicalVolume {
    pub(crate) fn bdev(uri: &str) -> Result<UntypedBdev, Error> {
        UntypedBdev::get_by_name(uri).map_err(|_| Error::BdevMissing {})
    }

    async fn bdev_share_nvmf(
        bdev: &mut UntypedBdev,
        props: Option<NvmfShareProps>,
    ) -> Result<String, Error> {
        let mut bdev = Pin::new(bdev);
        match bdev.shared() {
            Some(Protocol::Nvmf) => {
                bdev.as_mut()
                    .update_properties(props.map(Into::into))
                    .await
                    .map_err(|source| Error::BdevShare { source })?;
                bdev.share_uri().ok_or(Error::BdevShareUri {})
            }
            Some(Protocol::Off) | None => bdev
                .share_nvmf(props)
                .await
                .map_err(|source| Error::BdevShare { source }),
        }
    }
    async fn bdev_unshare(bdev: &mut UntypedBdev) -> Result<Option<String>, Error> {
        let mut bdev = Pin::new(bdev);
        match bdev.shared() {
            Some(Protocol::Nvmf) => {
                bdev.as_mut()
                    .unshare()
                    .await
                    .map_err(|source| Error::BdevUnshare { source })?;
            }
            Some(Protocol::Off) | None => {}
        }
        Ok(bdev.share_uri())
    }

    /// Share the lvol via nvmf.
    pub(crate) async fn share_nvmf(
        &mut self,
        props: Option<NvmfShareProps>,
    ) -> Result<String, Error> {
        let (bdev, uri) = self.bdev_mut_uri()?;

        let (nqn, bdev_opts) = crate::spdk_run!(async move {
            let mut bdev = Self::bdev(&uri)?;
            let nqn = Self::bdev_share_nvmf(&mut bdev, props).await?;
            Ok((nqn, BdevOpts::from(bdev)))
        })?;

        // todo: add per-bdev locking... to ensure we don't have concurrent spdk
        //  code...
        // todo: BUG: wrong share URI:
        // nqn.2019-05.io.openebs:/dev/sdavg/
        // d514adef-f4ce-4575-b10e-eb301de2cf99 vs
        // nqn.2019-05.io.openebs:d514adef-f4ce-4575-b10e-eb301de2cf99

        bdev.update_from(bdev_opts);
        self.sync_share_opts().await?;

        info!("{:?}: shared as NVMF", self);
        Ok(nqn)
    }

    /// Update the lvol share properties.
    pub(crate) async fn update_share_props<P: Into<Option<UpdateProps>>>(
        &mut self,
        props: P,
    ) -> Result<(), Error> {
        let (bdev, uri) = self.bdev_mut_uri()?;
        let props = props.into();
        let bdev_opts = crate::spdk_run!(async move {
            let mut bdev = Self::bdev(&uri)?;
            Pin::new(&mut bdev)
                .update_properties(props)
                .await
                .map_err(|e| Error::UpdateProps {
                    source: e,
                    name: bdev.name().to_string(),
                })?;
            Ok(BdevOpts::from(bdev))
        })?;
        bdev.update_from(bdev_opts);
        self.sync_share_opts().await?;
        Ok(())
    }

    /// Unshare the nvmf target.
    pub(crate) async fn unshare(&mut self) -> Result<(), Error> {
        let (bdev, uri) = self.bdev_mut_uri()?;
        let share = crate::spdk_run!(async move {
            let mut bdev = Self::bdev(&uri)?;
            Self::bdev_unshare(&mut bdev).await
        })?;

        bdev.share_uri = share;
        bdev.share = Protocol::Off;
        self.sync_share_protocol(Protocol::Off).await?;

        info!("{self:?}: unshared");
        Ok(())
    }

    /// Get the shared protocol, if any setup.
    pub(crate) fn share_proto(&self) -> Option<Protocol> {
        self.bdev.as_ref().map(|_| self.share)
    }

    /// Get a `PtplFileOps` from `&self`.
    pub(crate) fn ptpl(&self) -> impl PtplFileOps {
        LvolPtpl::from(self)
    }
}

pub struct LvolPtpl {
    vg: super::vg_pool::VgPtpl,
    uuid: String,
}
impl LvolPtpl {
    fn vg(&self) -> &super::vg_pool::VgPtpl {
        &self.vg
    }
    fn uuid(&self) -> &str {
        &self.uuid
    }
}
impl From<&LogicalVolume> for LvolPtpl {
    fn from(lvol: &LogicalVolume) -> Self {
        Self {
            vg: VolumeGroup::vg_ptpl(lvol.vg_name()),
            uuid: lvol.uuid().to_string(),
        }
    }
}

impl PtplFileOps for LvolPtpl {
    fn create(&self) -> Result<Option<PtplProps>, std::io::Error> {
        if let Some(path) = self.path() {
            self.vg().create()?;
            if let Some(parent) = path.parent() {
                std::fs::create_dir_all(parent)?;
            }
            return Ok(Some(PtplProps::new(path)));
        }
        Ok(None)
    }

    fn destroy(&self) -> Result<(), std::io::Error> {
        if let Some(path) = self.path() {
            std::fs::remove_file(path)?;
        }
        Ok(())
    }

    fn subpath(&self) -> std::path::PathBuf {
        self.vg()
            .subpath()
            .join("lv/")
            .join(self.uuid())
            .with_extension("json")
    }
}

impl crate::core::LogicalVolume for LogicalVolume {
    fn name(&self) -> String {
        self.name().clone().unwrap_or_default()
    }

    fn uuid(&self) -> String {
        self.lv_name.clone()
    }

    fn pool_name(&self) -> String {
        self.vg_name().into()
    }

    fn pool_uuid(&self) -> String {
        self.vg_uuid().into()
    }

    fn entity_id(&self) -> Option<String> {
        self.entity_id().cloned()
    }

    fn is_thin(&self) -> bool {
        self.thin()
    }

    fn is_read_only(&self) -> bool {
        false
    }

    fn size(&self) -> u64 {
        self.size
    }

    fn committed(&self) -> u64 {
        self.size
    }

    fn allocated(&self) -> u64 {
        self.size()
    }

    fn usage(&self) -> crate::core::logical_volume::LvolSpaceUsage {
        crate::core::logical_volume::LvolSpaceUsage {
            capacity_bytes: self.size(),
            allocated_bytes: self.allocated(),
            cluster_size: self.extent_size(),
            // todo: missing this information
            num_clusters: 0,
            num_allocated_clusters: 0,
            allocated_bytes_snapshots: 0,
            num_allocated_clusters_snapshots: 0,
            allocated_bytes_snapshot_from_clone: None,
        }
    }

    fn is_snapshot(&self) -> bool {
        false
    }

    fn is_clone(&self) -> bool {
        false
    }

    fn backend(&self) -> PoolBackend {
        PoolBackend::Lvm
    }

    fn snapshot_uuid(&self) -> Option<String> {
        None
    }

    fn share_protocol(&self) -> Protocol {
        self.share
    }

    fn bdev_share_uri(&self) -> Option<String> {
        self.uri().cloned()
    }

    fn nvmf_allowed_hosts(&self) -> Vec<String> {
        self.allowed_hosts().cloned().unwrap_or_default()
    }

    fn encrypted(&self) -> bool {
        // until we support lvm backend with encryption
        false
    }
}

#[async_trait::async_trait(?Send)]
impl Share for LogicalVolume {
    type Error = Error;
    type Output = String;

    async fn share_nvmf(
        mut self: Pin<&mut Self>,
        props: Option<NvmfShareProps>,
    ) -> Result<Self::Output, Self::Error> {
        self.share_nvmf(props).await
    }
    fn create_ptpl(&self) -> Result<Option<PtplProps>, Self::Error> {
        self.ptpl().create().map_err(|source| Error::BdevShare {
            source: crate::core::CoreError::Ptpl {
                reason: source.to_string(),
            },
        })
    }

    async fn update_properties<P: Into<Option<UpdateProps>>>(
        mut self: Pin<&mut Self>,
        props: P,
    ) -> Result<(), Self::Error> {
        self.as_mut().update_share_props(props).await
    }

    async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> {
        self.unshare().await
    }

    fn shared(&self) -> Option<Protocol> {
        self.share_proto()
    }

    fn share_uri(&self) -> Option<String> {
        self.uri().cloned()
    }

    fn allowed_hosts(&self) -> Vec<String> {
        self.allowed_hosts().cloned().unwrap_or_default()
    }

    fn bdev_uri(&self) -> Option<url::Url> {
        None
    }
    fn bdev_uri_original(&self) -> Option<url::Url> {
        None
    }
}
